-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Completable#mergeDelayError(Publisher) and HTTP client control flow #1336
Completable#mergeDelayError(Publisher) and HTTP client control flow #1336
Conversation
Motivation: The HTTP client control flow merges the resule of the write Publisher (a Completable) and the read Publisher via Completable#merge(Publisher). However when offloading is enabled it is possible events will be delivered out of order which may result in dropped data/trailers. Modifications: - Introduce the Completable#mergeDelayError(Publisher) operator - Use new mergeDelayError(..) operator instead of merge(..) so that both streams must complete which ensures all the read data is delivered to the application before any errors. It is assumed that if the write Publisher fails that the read publisher will also fail due to transport failure. Result: No more dropped data/trailres on HTTP client due to write failure (e.g. connection closure) being delivered before read data due to offloading (e.g. not enough demand, subscription is offloaded, demand arrives after write error is already delivered).
build failure attributed to #999 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 for more operators!
assertFalse(subscription.isCancelled()); | ||
publisher.onError(DELIBERATE_EXCEPTION); | ||
completable.verifyCancelled(); | ||
assertTrue(cancellable.isCancelled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the expectation for cancellable
if the completable
already completed before publisher.onError(DELIBERATE_EXCEPTION)
? Spec says that after a terminal event the subscription must be considered cancelled, is it ok if we cancel it explicitly anyway (even if it's already terminated)?
Same question for publisher
: if publisher
already terminated but then completable
fails, should we prevent cancel
from being invoked on the publisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1334 provides finer control here but there is still a race condition (two subscribers may be on different threads) that I don't think we can avoid. In practice I think it is OK as the subscription is cancelled after termination which means any interactions should be "NOPs".
offloadedSubscriber.onError(terminalSignal.cause); | ||
} | ||
} else { | ||
offloadedSubscriber.onError(currState.cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at CompletableMergeSubscriber
, there we add the 2nd error as suppressed for the 1st one, but here we propagate only the first failure. The 2nd may also be useful, consider propagating both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add it but I'm hesitant to use addSuppressed
in general as it may result in memory leak with static exceptions, difficult to "turn off" and limit the queue size ... but we can look at this issue more generally in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually unit tests remind me of another issue with addSuppressed
, self-suppression throws an exception so we would have to guard against this everywhere (which we don't). I'll skip addSuppressed
for now...
@@ -417,12 +423,13 @@ public void clientCloseTwoPipelinedRequestsSentFirstInitiatesClosure() throws Ex | |||
responseReceived.countDown(); | |||
}); | |||
// Send another request before connection receives a response for the first request: | |||
assertClosedChannelException("/second"); | |||
Future<StreamingHttpResponse> secondFuture = sendZeroLengthRequest("/second"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, the change is good enough for these tests
@@ -67,7 +200,6 @@ void handleSubscribe(final Subscriber<? super T> subscriber, final SignalOffload | |||
// invoke it from the mergeWith Publisher Executor thread. | |||
this.offloadedSubscriber = new ConcurrentTerminalSubscriber<>(signalOffloader.offloadSubscriber( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use ConcurrentSubscription
and ConcurrentTerminalSubscriber
to handle concurrency in this class. The ConcurrentSubscription
has a comment clarifying why it's required. Consider adding a similar comment for ConcurrentTerminalSubscriber
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentTerminalSubscriber
usage isn't changed in this PR and it is removed all together in #1334 so lets defer on this for now.
…pple#1336) Motivation: The HTTP client control flow merges the resule of the write Publisher (a Completable) and the read Publisher via Completable#merge(Publisher). However when offloading is enabled it is possible events will be delivered out of order which may result in dropped data/trailers. Modifications: - Introduce the Completable#mergeDelayError(Publisher) operator - Use new mergeDelayError(..) operator instead of merge(..) so that both streams must complete which ensures all the read data is delivered to the application before any errors. It is assumed that if the write Publisher fails that the read publisher will also fail due to transport failure. Result: No more dropped data/trailres on HTTP client due to write failure (e.g. connection closure) being delivered before read data due to offloading (e.g. not enough demand, subscription is offloaded, demand arrives after write error is already delivered).
Motivation:
The HTTP client control flow merges the resule of the write Publisher
(a Completable) and the read Publisher via Completable#merge(Publisher).
However when offloading is enabled it is possible events will be
delivered out of order which may result in dropped data/trailers.
Modifications:
streams must complete which ensures all the read data is delivered to
the application before any errors. It is assumed that if the write
Publisher fails that the read publisher will also fail due to transport
failure.
Result:
No more dropped data/trailres on HTTP client due to write failure (e.g.
connection closure) being delivered before read data due to offloading
(e.g. not enough demand, subscription is offloaded, demand arrives after
write error is already delivered).